/*
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.jctools.queues;

import org.jctools.queues.IndexedQueueSizeUtil.IndexedQueue;
import org.jctools.util.PortableJvmInfo;
import org.jctools.util.Pow2;
import org.jctools.util.RangeUtil;

import java.util.AbstractQueue;
import java.util.Iterator;
import java.util.NoSuchElementException;

import static org.jctools.queues.LinkedArrayQueueUtil.length;
import static org.jctools.queues.LinkedArrayQueueUtil.modifiedCalcCircularRefElementOffset;
import static org.jctools.util.UnsafeAccess.UNSAFE;
import static org.jctools.util.UnsafeAccess.fieldOffset;
import static org.jctools.util.UnsafeRefArrayAccess.*;


abstract class BaseMpscLinkedArrayQueuePad1<E> extends AbstractQueue<E> implements IndexedQueue {
    long p01, p02, p03, p04, p05, p06, p07;
    long p10, p11, p12, p13, p14, p15, p16, p17;
}

// $gen:ordered-fields
abstract class BaseMpscLinkedArrayQueueProducerFields<E> extends BaseMpscLinkedArrayQueuePad1<E> {
    private final static long P_INDEX_OFFSET = fieldOffset(BaseMpscLinkedArrayQueueProducerFields.class, "producerIndex");

    private volatile long producerIndex;

    // 获取生产者索引
    // lv = load  valatile
    @Override
    public final long lvProducerIndex() {
        return producerIndex;
    }

    //  so = store orderd
    final void soProducerIndex(long newValue) {
        UNSAFE.putOrderedLong(this, P_INDEX_OFFSET, newValue);
    }

    final boolean casProducerIndex(long expect, long newValue) {
        return UNSAFE.compareAndSwapLong(this, P_INDEX_OFFSET, expect, newValue);
    }
}

abstract class BaseMpscLinkedArrayQueuePad2<E> extends BaseMpscLinkedArrayQueueProducerFields<E> {
    long p01, p02, p03, p04, p05, p06, p07;
    long p10, p11, p12, p13, p14, p15, p16, p17;
}

// $gen:ordered-fields
abstract class BaseMpscLinkedArrayQueueConsumerFields<E> extends BaseMpscLinkedArrayQueuePad2<E> {
    private final static long C_INDEX_OFFSET = fieldOffset(BaseMpscLinkedArrayQueueConsumerFields.class, "consumerIndex");

    private volatile long consumerIndex;  // 计算消费 数组下标
    protected long consumerMask;   // 计算消费 数组下标的掩码
    protected E[] consumerBuffer; // 计算消费 数据的 数组

    // load volatile 消费者索引
    @Override
    public final long lvConsumerIndex() {
        return consumerIndex;
    }

    // load pain
    final long lpConsumerIndex() {
        return UNSAFE.getLong(this, C_INDEX_OFFSET);
    }

    //  store  ordered 消费者索引
    final void soConsumerIndex(long newValue) {
        UNSAFE.putOrderedLong(this, C_INDEX_OFFSET, newValue);
    }
}

abstract class BaseMpscLinkedArrayQueuePad3<E> extends BaseMpscLinkedArrayQueueConsumerFields<E> {
    long p0, p1, p2, p3, p4, p5, p6, p7;
    long p10, p11, p12, p13, p14, p15, p16, p17;
}

// $gen:ordered-fields
abstract class BaseMpscLinkedArrayQueueColdProducerFields<E> extends BaseMpscLinkedArrayQueuePad3<E> {
    private final static long P_LIMIT_OFFSET = fieldOffset(BaseMpscLinkedArrayQueueColdProducerFields.class, "producerLimit");

    private volatile long producerLimit; //生产上限
    protected long producerMask; // 计算生产者 数组下标的掩码
    protected E[] producerBuffer; // 计算生产者 数据的 数组

    final long lvProducerLimit() {
        return producerLimit;
    }

    final boolean casProducerLimit(long expect, long newValue) {
        return UNSAFE.compareAndSwapLong(this, P_LIMIT_OFFSET, expect, newValue);
    }

    final void soProducerLimit(long newValue) {
        UNSAFE.putOrderedLong(this, P_LIMIT_OFFSET, newValue);
    }
}

// MpscChunkedArrayQueue是一个非定长的队列，适合无法预测队列长度的场景。
// 基于数组+链表的结构，
// 不会像链表那样分配过多的Node，吞吐量比传统的链表高。
// 数组扩容的时候，也不存在数组复制，扩容的速度，也比传统的数组快

/**
 * An MPSC array queue which starts at <i>initialCapacity</i> and grows to <i>maxCapacity</i> in linked chunks
 * of the initial size. The queue grows only when the current buffer is full and elements are not copied on
 * resize, instead a link to the new buffer is stored in the old buffer for the consumer to follow.
 */
abstract class BaseMpscLinkedArrayQueue<E> extends BaseMpscLinkedArrayQueueColdProducerFields<E>
        implements MessagePassingQueue<E>, QueueProgressIndicators {

    // 数组被生产者填满后，会填充一个JUMP，代表队列扩容了，消费者遇到JUMP会消费下一个数组。

    // No post padding here, subclasses must add
    private static final Object JUMP = new Object();

    // 消费者消费完一个完整的数组后，会将最后一个元素设为BUFFER_CONSUMED。
    private static final Object BUFFER_CONSUMED = new Object();
    private static final int CONTINUE_TO_P_INDEX_CAS = 0;
    private static final int RETRY = 1;
    private static final int QUEUE_FULL = 2;
    private static final int QUEUE_RESIZE = 3;


    /**
     * @param initialCapacity the queue initial capacity. If chunk size is fixed this will be the chunk size.
     *                        Must be 2 or more.
     */
    public BaseMpscLinkedArrayQueue(final int initialCapacity) {

        // initialCapacity必须大于等于2
        RangeUtil.checkGreaterThanOrEqual(initialCapacity, 2, "initialCapacity");

        // 容量确保是2的幂次方数，找到initialCapacity下一个2的幂次方数
        int p2capacity = Pow2.roundToPowerOfTwo(initialCapacity);
        // leave lower bit of mask clear
        // index以2为步长递增，预留一个元素存JUMP，所以limit为:(capacity-1)*2
        long mask = (p2capacity - 1) << 1;
        // need extra element to point at next array
        // 需要一个额外元素来链接下一个数组
        E[] buffer = allocateRefArray(p2capacity + 1);
        // 生产者和消费者Buffer指向同一个数组
        producerBuffer = buffer;
        producerMask = mask;
        consumerBuffer = buffer;
        consumerMask = mask;
        // 设置producerLimit = mask
        soProducerLimit(mask); // we know it's all empty to start with
    }

    @Override
    public int size() {
        // NOTE: because indices are on even numbers we cannot use the size util.

        /*
         * It is possible for a thread to be interrupted or reschedule between the read of the producer and
         * consumer indices, therefore protection is required to ensure size is within valid range. In the
         * event of concurrent polls/offers to this method the size is OVER estimated as we read consumer
         * index BEFORE the producer index.
         */
        long after = lvConsumerIndex();
        long size;
        while (true) {
            final long before = after;
            final long currentProducerIndex = lvProducerIndex();
            after = lvConsumerIndex();
            if (before == after) {
                size = ((currentProducerIndex - after) >> 1);
                break;
            }
        }
        // Long overflow is impossible, so size is always positive. Integer overflow is possible for the unbounded
        // indexed queues.
        if (size > Integer.MAX_VALUE) {
            return Integer.MAX_VALUE;
        } else {
            return (int) size;
        }
    }

    @Override
    public boolean isEmpty() {
        // Order matters!
        // Loading consumer before producer allows for producer increments after consumer index is read.
        // This ensures this method is conservative in it's estimate. Note that as this is an MPMC there is
        // nothing we can do to make this an exact method.
        return (this.lvConsumerIndex() == this.lvProducerIndex());
    }

    @Override
    public String toString() {
        return this.getClass().getName();
    }

    /**
     * ，生产数据
     * 向队列中添加一个元素e
     *
     * @param e not {@code null}, will throw NPE if it is
     * @return
     */
    @Override
    public boolean offer(final E e) {
        if (null == e) {
            throw new NullPointerException();
        }

        long mask;
        E[] buffer; //生产者指向的数组
        long pIndex; //生产索引

        while (true) {
            long producerLimit = lvProducerLimit(); // 获取生产者索引最大限制


            pIndex = lvProducerIndex();        // 获取生产者索引

            // 生产索引以2为步长递增，
            // 第0位标识为resize，所以非扩容场景，不会是奇数，
            // 扩容的时候，会在offerSlowPath()中扩容线程会将其设为奇数
            // lower bit is indicative of resize, if we see it we spin until it's cleared
            if ((pIndex & 1) == 1) {
                // 奇数代表  mpsc 正在扩容，自旋，等待扩容完成
                continue;
            }
            // pIndex is even (lower bit is 0) -> actual index is (pIndex >> 1)
            // pIndex 是偶数， 实际的索引值 需要 除以2

            // mask/buffer may get changed by resizing -> only use for array access after successful CAS.

            mask = this.producerMask;
            buffer = this.producerBuffer;

            // a successful CAS ties the ordering, lv(pIndex) - [mask/buffer] -> cas(pIndex)
            // assumption behind this optimization is that queue is almost always empty or near empty
            // 阈值 producerLimit 小于等于生产者指针位置 pIndex
            // 需要扩容，
            if (producerLimit <= pIndex) {
                // 通过offerSlowPath返回状态值，来查看怎么来处理这个待添加的元素
                int result = offerSlowPath(mask, pIndex, producerLimit);
                switch (result) {
                    // producerLimit虽然达到了limit，
                    // 但是当前数组已经被消费了部分数据，暂时不会扩容，会使用已被消费的槽位。
                    case CONTINUE_TO_P_INDEX_CAS:
                        break;
                    // 可能由于并发原因导致CAS失败，那么则再次重新尝试添加元素
                    case RETRY:
                        continue;
                        // 队列已满，直接返回false操作
                    case QUEUE_FULL:
                        return false;
                    // 队列需要扩容操作
                    case QUEUE_RESIZE:
                        // 对队列进行直接扩容操作
                        resize(mask, buffer, pIndex, e, null);
                        return true;
                }
            }
            // 阈值 producerLimit 没有 小于等于生产者指针位置 pIndex
            // 阈值 producerLimit 大于 生产者指针位置 pIndex
            // 直接通过CAS操作对pIndex做加2处理，并且退出循环
            if (casProducerIndex(pIndex, pIndex + 2)) {
                break;
            }
        }
        // INDEX visible before ELEMENT
        final long offset = modifiedCalcCircularRefElementOffset(pIndex, mask);
        // 将buffer数组的指定位置替换为e，
        // 不是根据下标来设置的，是根据槽位的地址偏移量offset，UNSAFE操作。
        soRefElement(buffer, offset, e); // An ordered store of an element to a given offset
        return true;
    }

    /**
     * {@inheritDoc}
     * <p>
     * This implementation is correct for single consumer thread use only.
     */
    @SuppressWarnings("unchecked")

    //
    // poll()没有做并发控制，MpscQueue是多生产单消费者的Queue，只有一个消费者，这个也是netty 换成 mspcqueue的主要原因
    @Override
    public E poll() {
        final E[] buffer = consumerBuffer;
        final long index = lpConsumerIndex();
        final long mask = consumerMask;

        final long offset = modifiedCalcCircularRefElementOffset(index, mask);
        Object e = lvRefElement(buffer, offset);
        if (e == null) {
            if (index != lvProducerIndex()) {
                // poll() == null iff queue is empty, null element is not strong enough indicator, so we must
                // check the producer index. If the queue is indeed not empty we spin until element is
                // visible.

                /*
			    offer()时生产者先CAS改producerIndex，再设置元素。	中间会有一个时间差，此时会自旋，等待元素设置完成。
			 */
                do {
                    e = lvRefElement(buffer, offset);
                }
                while (e == null);
            } else {

                //元素已经消费完
                return null;
            }
        }

        // 队列扩容了
        if (e == JUMP) {
            //获取下一个数组
            final E[] nextBuffer = nextBuffer(buffer, mask);
            //从下一个数组中消费
            return newBufferPoll(nextBuffer, index);
        }

        // 取出元素后，将原来的槽位设为null
        soRefElement(buffer, offset, null); // release element null
        // 递增consumerIndex
        soConsumerIndex(index + 2); // release cIndex
        return (E) e;
    }

    /**
     * {@inheritDoc}
     * <p>
     * This implementation is correct for single consumer thread use only.
     */
    @SuppressWarnings("unchecked")
    @Override
    public E peek() {
        final E[] buffer = consumerBuffer;
        final long index = lpConsumerIndex();
        final long mask = consumerMask;

        final long offset = modifiedCalcCircularRefElementOffset(index, mask);
        Object e = lvRefElement(buffer, offset);
        if (e == null && index != lvProducerIndex()) {
            // peek() == null iff queue is empty, null element is not strong enough indicator, so we must
            // check the producer index. If the queue is indeed not empty we spin until element is visible.
            do {
                e = lvRefElement(buffer, offset);
            }
            while (e == null);
        }
        if (e == JUMP) {
            return newBufferPeek(nextBuffer(buffer, mask), index);
        }
        return (E) e;
    }

    /**
     * We do not inline resize into this method because we do not resize on fill.
     */
    /**
     * @param mask
     * @param pIndex        生产者索引
     * @param producerLimit 生产者limit
     * @return
     */
    private int offerSlowPath(long mask, long pIndex, long producerLimit) {
        // 消费者索引
        final long cIndex = lvConsumerIndex();
        // 数组缓冲的容量，(长度-1) * 2
        long bufferCapacity = getCurrentBufferCapacity(mask);
        // 消费索引  + 当前数组的容量 > 生产索引，代表当前数组已有部分元素被消费了，
        // 不会扩容，会使用已被消费的槽位。
        // cIndex + bufferCapacity =》 producerLimit
        if (cIndex + bufferCapacity > pIndex) {
            if (!casProducerLimit(producerLimit, cIndex + bufferCapacity)) {
                // CAS失败，自旋重试
                // retry from top
                return RETRY;
            } else {
                // continue to pIndex CAS
                // 重试  CAS修改 生产索引
                return CONTINUE_TO_P_INDEX_CAS;
            }
        }
        // full and cannot grow
        // 根据生产者和消费者索引判断Queue是否已满，无界队列永不会满
        else if (availableInQueue(pIndex, cIndex) <= 0) {
            // offer should return false;
            return QUEUE_FULL;
        }
        // grab index for resize -> set lower bit
        // CAS的方式将producerIndex加1，奇数代表正在resize
        else if (casProducerIndex(pIndex, pIndex + 1)) {
            // trigger a resize
            return QUEUE_RESIZE;
        } else {
            // failed resize attempt, retry from top
            return RETRY;
        }
    }

    /**
     * @return available elements in queue * 2
     */
    protected abstract long availableInQueue(long pIndex, long cIndex);

    @SuppressWarnings("unchecked")
    private E[] nextBuffer(final E[] buffer, final long mask) {

           /*
		     通过当前数组的最后一个元素，获取下一个待消费的数组，
		     将当前数组最后一个元素设为BUFFER_CONSUMED，代表当前数组已经消费完毕。
		 */
        final long offset = nextArrayOffset(mask);
        final E[] nextBuffer = (E[]) lvRefElement(buffer, offset);
        consumerBuffer = nextBuffer;
        consumerMask = (length(nextBuffer) - 2) << 1;
        soRefElement(buffer, offset, BUFFER_CONSUMED);
        return nextBuffer;
    }

    private static long nextArrayOffset(long mask) {
        return modifiedCalcCircularRefElementOffset(mask + 2, Long.MAX_VALUE);
    }

    private E newBufferPoll(E[] nextBuffer, long index) {
        final long offset = modifiedCalcCircularRefElementOffset(index, consumerMask);
        final E n = lvRefElement(nextBuffer, offset);
        if (n == null) {
            throw new IllegalStateException("new buffer must have at least one element");
        }

        // 取出元素后，将原来的槽位设为null
        soRefElement(nextBuffer, offset, null);
        // 递增consumerIndex
        soConsumerIndex(index + 2);
        return n;
    }

    private E newBufferPeek(E[] nextBuffer, long index) {
        final long offset = modifiedCalcCircularRefElementOffset(index, consumerMask);
        final E n = lvRefElement(nextBuffer, offset);
        if (null == n) {
            throw new IllegalStateException("new buffer must have at least one element");
        }
        return n;
    }

    @Override
    public long currentProducerIndex() {
        return lvProducerIndex() / 2;
    }

    @Override
    public long currentConsumerIndex() {
        return lvConsumerIndex() / 2;
    }

    @Override
    public abstract int capacity();

    @Override
    public boolean relaxedOffer(E e) {
        return offer(e);
    }

    @SuppressWarnings("unchecked")
    @Override
    public E relaxedPoll() {
        final E[] buffer = consumerBuffer;
        final long index = lpConsumerIndex();
        final long mask = consumerMask;

        final long offset = modifiedCalcCircularRefElementOffset(index, mask);
        Object e = lvRefElement(buffer, offset);
        if (e == null) {
            return null;
        }
        if (e == JUMP) {
            final E[] nextBuffer = nextBuffer(buffer, mask);
            return newBufferPoll(nextBuffer, index);
        }
        soRefElement(buffer, offset, null);
        soConsumerIndex(index + 2);
        return (E) e;
    }

    @SuppressWarnings("unchecked")
    @Override
    public E relaxedPeek() {
        final E[] buffer = consumerBuffer;
        final long index = lpConsumerIndex();
        final long mask = consumerMask;

        final long offset = modifiedCalcCircularRefElementOffset(index, mask);
        Object e = lvRefElement(buffer, offset);
        if (e == JUMP) {
            return newBufferPeek(nextBuffer(buffer, mask), index);
        }
        return (E) e;
    }

    @Override
    public int fill(Supplier<E> s) {
        long result = 0;// result is a long because we want to have a safepoint check at regular intervals
        final int capacity = capacity();
        do {
            final int filled = fill(s, PortableJvmInfo.RECOMENDED_OFFER_BATCH);
            if (filled == 0) {
                return (int) result;
            }
            result += filled;
        }
        while (result <= capacity);
        return (int) result;
    }

    @Override
    public int fill(Supplier<E> s, int limit) {
        if (null == s)
            throw new IllegalArgumentException("supplier is null");
        if (limit < 0)
            throw new IllegalArgumentException("limit is negative:" + limit);
        if (limit == 0)
            return 0;

        long mask;
        E[] buffer;
        long pIndex;
        int claimedSlots;
        while (true) {
            long producerLimit = lvProducerLimit();
            pIndex = lvProducerIndex();
            // lower bit is indicative of resize, if we see it we spin until it's cleared
            if ((pIndex & 1) == 1) {
                continue;
            }
            // pIndex is even (lower bit is 0) -> actual index is (pIndex >> 1)

            // NOTE: mask/buffer may get changed by resizing -> only use for array access after successful CAS.
            // Only by virtue offloading them between the lvProducerIndex and a successful casProducerIndex are they
            // safe to use.
            mask = this.producerMask;
            buffer = this.producerBuffer;
            // a successful CAS ties the ordering, lv(pIndex) -> [mask/buffer] -> cas(pIndex)

            // we want 'limit' slots, but will settle for whatever is visible to 'producerLimit'
            long batchIndex = Math.min(producerLimit, pIndex + 2l * limit); //  -> producerLimit >= batchIndex

            if (pIndex >= producerLimit) {
                int result = offerSlowPath(mask, pIndex, producerLimit);
                switch (result) {
                    case CONTINUE_TO_P_INDEX_CAS:
                        // offer slow path verifies only one slot ahead, we cannot rely on indication here
                    case RETRY:
                        continue;
                    case QUEUE_FULL:
                        return 0;
                    case QUEUE_RESIZE:
                        resize(mask, buffer, pIndex, null, s);
                        return 1;
                }
            }

            // claim limit slots at once
            if (casProducerIndex(pIndex, batchIndex)) {
                claimedSlots = (int) ((batchIndex - pIndex) / 2);
                break;
            }
        }

        for (int i = 0; i < claimedSlots; i++) {
            final long offset = modifiedCalcCircularRefElementOffset(pIndex + 2l * i, mask);
            soRefElement(buffer, offset, s.get());
        }
        return claimedSlots;
    }

    @Override
    public void fill(Supplier<E> s, WaitStrategy wait, ExitCondition exit) {
        MessagePassingQueueUtil.fill(this, s, wait, exit);
    }

    @Override
    public int drain(Consumer<E> c) {
        return drain(c, capacity());
    }

    @Override
    public int drain(Consumer<E> c, int limit) {
        return MessagePassingQueueUtil.drain(this, c, limit);
    }

    @Override
    public void drain(Consumer<E> c, WaitStrategy wait, ExitCondition exit) {
        MessagePassingQueueUtil.drain(this, c, wait, exit);
    }

    /**
     * Get an iterator for this queue. This method is thread safe.
     * <p>
     * The iterator provides a best-effort snapshot of the elements in the queue.
     * The returned iterator is not guaranteed to return elements in queue order,
     * and races with the consumer thread may cause gaps in the sequence of returned elements.
     * Like {link #relaxedPoll}, the iterator may not immediately return newly inserted elements.
     *
     * @return The iterator.
     */
    @Override
    public Iterator<E> iterator() {
        return new WeakIterator(consumerBuffer, lvConsumerIndex(), lvProducerIndex());
    }

    private static class WeakIterator<E> implements Iterator<E> {
        private final long pIndex;
        private long nextIndex;
        private E nextElement;
        private E[] currentBuffer;
        private int mask;

        WeakIterator(E[] currentBuffer, long cIndex, long pIndex) {
            this.pIndex = pIndex >> 1;
            this.nextIndex = cIndex >> 1;
            setBuffer(currentBuffer);
            nextElement = getNext();
        }

        @Override
        public void remove() {
            throw new UnsupportedOperationException("remove");
        }

        @Override
        public boolean hasNext() {
            return nextElement != null;
        }

        @Override
        public E next() {
            final E e = nextElement;
            if (e == null) {
                throw new NoSuchElementException();
            }
            nextElement = getNext();
            return e;
        }

        private void setBuffer(E[] buffer) {
            this.currentBuffer = buffer;
            this.mask = length(buffer) - 2;
        }

        private E getNext() {
            while (nextIndex < pIndex) {
                long index = nextIndex++;
                E e = lvRefElement(currentBuffer, calcCircularRefElementOffset(index, mask));
                // skip removed/not yet visible elements
                if (e == null) {
                    continue;
                }

                // not null && not JUMP -> found next element
                if (e != JUMP) {
                    return e;
                }

                // need to jump to the next buffer
                int nextBufferIndex = mask + 1;
                Object nextBuffer = lvRefElement(currentBuffer,
                        calcRefElementOffset(nextBufferIndex));

                if (nextBuffer == BUFFER_CONSUMED || nextBuffer == null) {
                    // Consumer may have passed us, or the next buffer is not visible yet: drop out early
                    return null;
                }

                setBuffer((E[]) nextBuffer);
                // now with the new array retry the load, it can't be a JUMP, but we need to repeat same index
                e = lvRefElement(currentBuffer, calcCircularRefElementOffset(index, mask));
                // skip removed/not yet visible elements
                if (e == null) {
                    continue;
                } else {
                    return e;
                }

            }
            return null;
        }
    }

    // 扩容:
// 新建一个E[]，将oldBuffer和newBuffer建立连接。
    //将producerBuffer指向新数组，然后将元素e放到新数组中，
// 旧元素的最后一个元素指向新数组，形成链表。
// 还会将旧元素的槽位填充JUMP元素，代表队列扩容了。
    private void resize(long oldMask, E[] oldBuffer, long pIndex, E e, Supplier<E> s) {
        assert (e != null && s == null) || (e == null || s != null);

        // 下一个Buffer的长度，MpscQueue会构建一个相同长度的Buffer
        int newBufferLength = getNextBufferSize(oldBuffer);
        final E[] newBuffer;
        try {
            // 创建一个新的E[]

            newBuffer = allocateRefArray(newBufferLength);
        } catch (OutOfMemoryError oom) {
            assert lvProducerIndex() == pIndex + 1;
            soProducerIndex(pIndex);
            throw oom;
        }

        // 生产者Buffer指向新的E[]
        producerBuffer = newBuffer;
        // 计算新的Mask，Buffer长度不变的情况下，Mask也不变
        final int newMask = (newBufferLength - 2) << 1;
        producerMask = newMask;

        // 根据该偏移量设置oldBuffer的JUMP元素，会递增然后重置，不断循环
        final long offsetInOld = modifiedCalcCircularRefElementOffset(pIndex, oldMask);
        // Mask不变的情况下，oldBuffer的JUMP对应的位置，就是newBuffer中要消费的位置.
        final long offsetInNew = modifiedCalcCircularRefElementOffset(pIndex, newMask);

        // 元素e放到新数组中
        soRefElement(newBuffer, offsetInNew, e == null ? s.get() : e);// element in new array
        // 旧数组和新数组建立连接，旧数组的最后一个元素保存新数组的地址。
        soRefElement(oldBuffer, nextArrayOffset(oldMask), newBuffer);// buffer linked

        // ASSERT code
        final long cIndex = lvConsumerIndex();
        final long availableInQueue = availableInQueue(pIndex, cIndex);
        RangeUtil.checkPositive(availableInQueue, "availableInQueue");

        // Invalidate racing CASs
        // We never set the limit beyond the bounds of a buffer
        soProducerLimit(pIndex + Math.min(newMask, availableInQueue));

        // make resize visible to the other producers
        soProducerIndex(pIndex + 2);

        // INDEX visible before ELEMENT, consistent with consumer expectation

        // make resize visible to consumer
        soRefElement(oldBuffer, offsetInOld, JUMP);
    }

    /**
     * @return next buffer size(inclusive of next array pointer)
     */
    protected abstract int getNextBufferSize(E[] buffer);

    /**
     * @return current buffer capacity for elements (excluding next pointer and jump entry) * 2
     */
    protected abstract long getCurrentBufferCapacity(long mask);
}
